/*
 * @Author: lokei
 * @Date: 2022-09-21 21:06:22
 * @LastEditors: lokei
 * @LastEditTime: 2023-10-24 20:23:14
 * @Description: 
 */
package cn.lokei.task.handler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class RedisStreamPendingHandler extends Thread {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    @Scheduled(fixedDelay = 60 * 1000)
    public void run() {
        StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();

        List<String> key_list = new ArrayList<String>();
        key_list.add("smt_print");
        key_list.add("send_message");
        key_list.add("open_cabinet");
        key_list.add("douyin_order_sync");
        for (String key : key_list) {
            // 获取smt_print_grp中的pending消息信息，本质上就是执行XPENDING指令
            PendingMessagesSummary pendingMessagesSummary = streamOperations.pending(key, key + "_grp");

            if (pendingMessagesSummary != null) {
                // 所有pending消息的数量
                long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
                if (totalPendingMessages > 0) {
                    // 消费组名称
                    String groupName = pendingMessagesSummary.getGroupName();

                    // pending队列中的最小ID
                    String minMessageId = pendingMessagesSummary.minMessageId();

                    // pending队列中的最大ID
                    String maxMessageId = pendingMessagesSummary.maxMessageId();

                    log.info("消费组：{}，一共有{}条pending消息，最大ID={}，最小ID={}", groupName, totalPendingMessages, minMessageId,
                            maxMessageId);

                    // 每个消费者的pending消息数量
                    Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary
                            .getPendingMessagesPerConsumer();

                    pendingMessagesPerConsumer.entrySet().forEach(entry -> {

                        // 消费者
                        String consumer = entry.getKey();
                        // 消费者的pending消息数量
                        long consumerTotalPendingMessages = entry.getValue();

                        log.info("消费者：{}，一共有{}条pending消息", consumer, consumerTotalPendingMessages);

                        if (consumerTotalPendingMessages > 0) {
                            // 读取消费者pending队列的前10条记录，从ID=0的记录开始，一直到ID最大值
                            PendingMessages pendingMessages = streamOperations.pending(key,
                                    Consumer.from(key + "_grp", consumer), Range.closed("0", "+"), 10);

                            // 遍历所有Opending消息的详情
                            pendingMessages.forEach(message -> {
                                // 消息的ID
                                RecordId recordId = message.getId();
                                // 消息从消费组中获取，到此刻的时间
                                Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
                                // 消息被获取的次数
                                long deliveryCount = message.getTotalDeliveryCount();

                                log.info("openg消息，id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId,
                                        elapsedTimeSinceLastDelivery, deliveryCount);

                                /**
                                 * 演示手动消费的这个判断非常的针对，目的就是要读取消费者“my_consumer1”pending消息中，ID=1605524665215-0的这条消息
                                 */
                                // if (consumer.equals("my_consumer1") &&
                                // recordId.toString().equals("1605524665215-0")) {
                                // 通过streamOperations，直接读取这条pending消息，
                                List<MapRecord<String, String, String>> result = streamOperations.range(key,
                                        Range.closed(recordId.toString(), recordId.toString()));

                                if (result != null && result.size() > 0) {
                                    // 开始和结束都是同一个ID，所以结果只有一条
                                    MapRecord<String, String, String> record = result.get(0);

                                    // 这里执行日志输出，模拟的就是消费逻辑
                                    log.info("消费了pending消息：id={}, value={}", record.getId(), record.getValue());

                                    // 如果手动消费成功后，往消费组提交消息的ACK
                                    Long retVal = streamOperations.acknowledge(key + "_grp", record);
                                    log.info("消息ack，一共ack了{}条", retVal);
                                    // }

                                    streamOperations.delete(record);
                                }

                            });
                        }
                    });
                }
            }
        }
    }
}
